Appearance
LangGraph Store 集成方案
一、模块概述
| 属性 | 说明 |
|---|---|
| 模块名称 | Store(长期记忆) |
| 优先级 | 🟡 P2(中高) |
| 预估工时 | 2-3 天 |
| 依赖项 | langgraph.store, PostgresStore/RedisStore |
为什么需要
当前 Checkpointer 只能保存单次会话的状态,无法跨会话记住用户偏好。集成 Store 后可以:
- 跨会话记住用户偏好(语言、风格等)
- 实现个性化对话体验
- 积累用户相关知识
- 增强上下文理解能力
二、架构设计
2.1 Checkpointer vs Store 对比
┌─────────────────────────────────────────────────────────────────────┐
│ Checkpointer vs Store 对比 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Checkpointer (短期记忆) Store (长期记忆) │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 会话 A │ │ │ │
│ │ thread_id: 1 │ │ User 123 │ │
│ │ messages: [...]│ │ ┌───────────┐ │ │
│ └─────────────────┘ │ │preferences│ │ │
│ │ │ - 语言:中文│ │ │
│ ┌─────────────────┐ │ │ - 风格:简洁│ │ │
│ │ 会话 B │ │ └───────────┘ │ │
│ │ thread_id: 2 │ │ ┌───────────┐ │ │
│ │ messages: [...]│ │ │ memories │ │ │
│ └─────────────────┘ │ │ - 喜欢猫 │ │ │
│ │ │ - 程序员 │ │ │
│ ⚠️ 会话之间隔离 │ └───────────┘ │ │
│ 无法共享信息 │ │ │
│ │ ✅ 跨会话共享 │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘2.2 Store 数据模型
python
# Store 使用 namespace 组织数据
# namespace 格式: (category, user_id, sub_category)
# 示例数据结构:
namespace = ("memories", "user_123", "preferences")
# 存储键值对
store.put(namespace, "language", {"value": "中文", "confidence": 0.9})
store.put(namespace, "style", {"value": "简洁", "confidence": 0.8})
namespace = ("memories", "user_123", "facts")
store.put(namespace, str(uuid.uuid4()), {"text": "用户是一名程序员", "source": "对话中提到"})
store.put(namespace, str(uuid.uuid4()), {"text": "用户喜欢猫", "source": "用户明确说明"})2.3 状态设计
python
from typing import TypedDict, Optional, List, Any
from dataclasses import dataclass
from langgraph.graph import MessagesState
from langgraph.runtime import Runtime
@dataclass
class UserContext:
"""用户上下文"""
user_id: str
nickname: Optional[str] = None
email: Optional[str] = None
class StateWithMemory(MessagesState):
"""带记忆的状态"""
# 继承 messages
user_id: str
relevant_memories: Optional[List[str]] = None三、代码实现
3.1 Store 初始化
修改文件: services/checkpointer.py,添加 Store 初始化:
python
"""Store 初始化 - 在现有 checkpointer.py 中添加"""
from contextlib import contextmanager
import os
from dotenv import load_dotenv
load_dotenv(override=True)
def get_mysql_connection_string() -> str:
"""获取 MySQL 连接字符串"""
host = os.getenv("MYSQL_HOST", "localhost")
port = os.getenv("MYSQL_PORT", "3306")
user = os.getenv("MYSQL_USER", "root")
password = os.getenv("MYSQL_PASSWORD", "root")
database = os.getenv("MYSQL_DATABASE", "langchain_chat")
return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
@contextmanager
def get_checkpointer():
"""获取 MySQL Checkpointer"""
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
conn_string = get_mysql_connection_string()
with PyMySQLSaver.from_conn_string(conn_string) as checkpointer:
checkpointer.setup()
yield checkpointer
@contextmanager
def get_async_checkpointer():
"""获取异步 MySQL Checkpointer"""
from langgraph.checkpoint.mysql.asyncmy import AsyncPyMySQLSaver
conn_string = get_mysql_connection_string()
with AsyncPyMySQLSaver.from_conn_string(conn_string) as checkpointer:
checkpointer.setup()
yield checkpointer
# ============ 新增 Store 相关函数 ============
@contextmanager
def get_store():
"""
获取 Store 上下文管理器
使用 PostgreSQL 作为长期记忆存储(推荐)
需要: pip install langgraph-checkpoint-postgres
Example:
with get_store() as store:
memories = store.search(("memories", user_id))
"""
# 使用 PostgreSQL Store(推荐,支持语义搜索)
# from langgraph.store.postgres import PostgresStore
# from langchain.embeddings import init_embeddings
# 简化版:使用内存 Store(开发测试用)
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
yield store
@contextmanager
def get_postgres_store():
"""
获取 PostgreSQL Store(生产环境推荐)
支持语义搜索,需要安装:
pip install langgraph-checkpoint-postgres
配置环境变量:
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DATABASE=langgraph_store
"""
from langgraph.store.postgres import PostgresStore
from langchain.embeddings import init_embeddings
host = os.getenv("POSTGRES_HOST", "localhost")
port = os.getenv("POSTGRES_PORT", "5432")
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "postgres")
database = os.getenv("POSTGRES_DATABASE", "langgraph_store")
conn_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
# 可选:启用语义搜索
embeddings = init_embeddings("openai:text-embedding-3-small")
with PostgresStore.from_conn_string(
conn_string,
index={"embed": embeddings, "dims": 1536}
) as store:
store.setup()
yield store3.2 记忆管理服务
创建文件: services/memory_store.py
python
"""用户长期记忆管理服务
提供用户偏好的存储、检索和管理功能。
"""
import logging
import uuid
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class MemoryItem:
"""记忆项"""
key: str
value: Dict[str, Any]
namespace: tuple
class MemoryService:
"""记忆管理服务"""
def __init__(self):
self._store = None
def _get_namespace(self, user_id: str, category: str = "facts") -> tuple:
"""获取命名空间"""
return ("memories", user_id, category)
def store_preference(
self,
user_id: str,
preference_type: str,
value: Any,
confidence: float = 1.0
) -> None:
"""
存储用户偏好
Args:
user_id: 用户 ID
preference_type: 偏好类型(language, style, topic 等)
value: 偏好值
confidence: 置信度 (0-1)
"""
from services.checkpointer import get_store
with get_store() as store:
namespace = self._get_namespace(user_id, "preferences")
store.put(
namespace,
preference_type,
{"value": value, "confidence": confidence}
)
logger.info(f"存储用户偏好: {user_id}/{preference_type} = {value}")
def get_preference(self, user_id: str, preference_type: str) -> Optional[Any]:
"""获取用户偏好"""
from services.checkpointer import get_store
with get_store() as store:
namespace = self._get_namespace(user_id, "preferences")
try:
item = store.get(namespace, preference_type)
return item.value if item else None
except Exception as e:
logger.warning(f"获取偏好失败: {e}")
return None
def store_fact(
self,
user_id: str,
fact: str,
source: str = "conversation"
) -> None:
"""
存储用户事实信息
Args:
user_id: 用户 ID
fact: 事实内容(如"用户是一名程序员")
source: 来源(conversation, explicit 等)
"""
from services.checkpointer import get_store
with get_store() as store:
namespace = self._get_namespace(user_id, "facts")
store.put(
namespace,
str(uuid.uuid4()),
{"text": fact, "source": source}
)
logger.info(f"存储用户事实: {user_id} - {fact[:50]}...")
def search_memories(
self,
user_id: str,
query: str,
limit: int = 5
) -> List[MemoryItem]:
"""
搜索用户相关记忆
Args:
user_id: 用户 ID
query: 搜索查询
limit: 返回数量限制
Returns:
匹配的记忆列表
"""
from services.checkpointer import get_store
with get_store() as store:
namespace = self._get_namespace(user_id, "facts")
try:
items = store.search(namespace, query=query, limit=limit)
return [
MemoryItem(key=item.key, value=item.value, namespace=namespace)
for item in items
]
except Exception as e:
logger.warning(f"搜索记忆失败: {e}")
return []
def get_all_preferences(self, user_id: str) -> Dict[str, Any]:
"""获取用户所有偏好"""
from services.checkpointer import get_store
with get_store() as store:
namespace = self._get_namespace(user_id, "preferences")
try:
# 获取命名空间下所有项目
items = store.search(namespace, limit=100)
return {
item.key: item.value.get("value")
for item in items
}
except Exception as e:
logger.warning(f"获取偏好失败: {e}")
return {}
def build_memory_context(self, user_id: str, query: str) -> str:
"""
构建记忆上下文
用于在系统提示中添加用户相关信息
Args:
user_id: 用户 ID
query: 当前查询(用于语义搜索)
Returns:
格式化的记忆上下文字符串
"""
preferences = self.get_all_preferences(user_id)
relevant_facts = self.search_memories(user_id, query, limit=3)
context_parts = []
# 添加偏好
if preferences:
pref_str = ", ".join(f"{k}: {v}" for k, v in preferences.items())
context_parts.append(f"用户偏好: {pref_str}")
# 添加相关事实
if relevant_facts:
facts_str = "\n".join(f"- {f.value.get('text', '')}" for f in relevant_facts)
context_parts.append(f"关于用户的信息:\n{facts_str}")
if context_parts:
return "\n\n".join(context_parts)
return ""
# 全局实例
_memory_service: Optional[MemoryService] = None
def get_memory_service() -> MemoryService:
"""获取记忆服务单例"""
global _memory_service
if _memory_service is None:
_memory_service = MemoryService()
return _memory_service3.3 带 Store 的 Agent
修改 services/langgraph_chat.py,添加 Store 支持:
python
"""带 Store 支持的聊天服务
在原有基础上添加长期记忆功能。
"""
import os
import logging
from typing import Optional, Iterator, Dict, Any, List
from dataclasses import dataclass, field
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, MessagesState
from services.memory_store import get_memory_service
load_dotenv(override=True)
logger = logging.getLogger(__name__)
@dataclass
class ChatConfig:
"""聊天配置"""
api_key: str = field(default_factory=lambda: os.getenv("OPENROUTER_API_KEY", ""))
base_url: str = field(default_factory=lambda: os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"))
default_model: str = field(default_factory=lambda: os.getenv("OPENROUTER_MODEL", "openai/gpt-4o"))
default_system_prompt: str = "你是一个友好的 AI 助手,请用中文回复。"
class MemoryEnabledChatService:
"""带长期记忆的聊天服务"""
def __init__(self, config: Optional[ChatConfig] = None):
self.config = config or ChatConfig()
self.memory_service = get_memory_service()
def _get_llm(self, model: Optional[str] = None, temperature: float = 0.7) -> ChatOpenAI:
"""获取 LLM 实例"""
return ChatOpenAI(
model=model or self.config.default_model,
api_key=self.config.api_key,
base_url=self.config.base_url,
temperature=temperature
)
def _call_model_with_memory(
self,
state: MessagesState,
model: Optional[str] = None,
system_prompt: Optional[str] = None,
user_id: Optional[str] = None
):
"""调用模型(带记忆上下文)"""
llm = self._get_llm(model)
# 构建记忆上下文
memory_context = ""
if user_id:
query = str(state["messages"][-1].content) if state["messages"] else ""
memory_context = self.memory_service.build_memory_context(user_id, query)
# 检测是否需要存储新记忆
self._maybe_store_memory(user_id, state)
# 构建系统提示
if memory_context:
enhanced_system_prompt = f"""{system_prompt or self.config.default_system_prompt}
## 用户相关信息
{memory_context}
请根据用户的偏好和历史信息提供个性化的回答。"""
else:
enhanced_system_prompt = system_prompt or self.config.default_system_prompt
# 调用模型
messages = [
SystemMessage(content=enhanced_system_prompt),
*state["messages"]
]
response = llm.invoke(messages)
return {"messages": [response]}
def _maybe_store_memory(self, user_id: str, state: MessagesState) -> None:
"""检测并存储新记忆"""
if not state["messages"]:
return
last_message = state["messages"][-1]
content = str(last_message.content).lower()
# 检测用户声明偏好
preference_patterns = {
"language": ["我喜欢用中文", "请用中文", "用中文回复", "我习惯用英文"],
"style": ["简洁一点", "详细一点", "简短回答", "详细解释"],
}
for pref_type, patterns in preference_patterns.items():
for pattern in patterns:
if pattern in content:
value = "中文" if "中文" in pattern else "简洁" if "简洁" in pattern or "简短" in pattern else "详细"
self.memory_service.store_preference(user_id, pref_type, value)
logger.info(f"自动检测到用户偏好: {pref_type} = {value}")
return
# 检测用户声明事实
fact_keywords = ["我是", "我叫", "我住在", "我在", "我喜欢", "我的工作是", "我是做"]
for keyword in fact_keywords:
if keyword in content:
# 提取事实(简化版,实际可以用 LLM 提取)
self.memory_service.store_fact(user_id, last_message.content, source="explicit")
return
def chat_stream_with_memory(
self,
thread_id: str,
prompt: str,
user_id: Optional[str] = None,
model: Optional[str] = None,
system_prompt: Optional[str] = None,
images: Optional[List[str]] = None,
temperature: float = 0.7,
**kwargs
) -> Iterator[str]:
"""
流式对话(带长期记忆)
Args:
thread_id: 会话 ID
prompt: 用户输入
user_id: 用户 ID(用于检索长期记忆)
model: 模型名称
system_prompt: 系统提示
images: 图片列表
temperature: 温度参数
"""
from services.checkpointer import get_checkpointer
# 构建消息内容
content = prompt
if images:
# 处理图片逻辑...
pass
with get_checkpointer() as checkpointer:
# 构建带记忆的工作流
def call_model(state: MessagesState):
return self._call_model_with_memory(
state, model, system_prompt, user_id
)
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_edge(START, "agent")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": thread_id}}
for chunk in app.stream(
{"messages": [HumanMessage(content=content)]},
config,
stream_mode="messages"
):
if isinstance(chunk, tuple) and len(chunk) == 2:
message_chunk, metadata = chunk
if hasattr(message_chunk, 'content') and message_chunk.content:
yield message_chunk.content
# 全局服务实例
_memory_chat_service: Optional[MemoryEnabledChatService] = None
def get_memory_chat_service() -> MemoryEnabledChatService:
"""获取带记忆的聊天服务单例"""
global _memory_chat_service
if _memory_chat_service is None:
_memory_chat_service = MemoryEnabledChatService()
return _memory_chat_service四、API 集成
4.1 修改现有 API
修改 api/chat.py,在聊天时传入用户 ID:
python
from services.langgraph_chat import get_memory_chat_service
@router.post("/chat/stream")
async def chat_stream(
data: ChatRequest,
request: Request,
db: Session = Depends(get_db)
):
"""流式聊天接口(带长期记忆)"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
# ... 其他代码 ...
# 使用带记忆的服务
chat_service = get_memory_chat_service()
for chunk in chat_service.chat_stream_with_memory(
thread_id=thread_id or "anonymous",
prompt=data.prompt,
user_id=user_id, # 传入用户 ID
model=data.model,
system_prompt=data.system_prompt,
images=llm_image_urls,
user_info=user_info
):
yield chunk4.2 新增记忆管理 API
创建 api/memory.py:
python
"""记忆管理 API"""
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from typing import Optional, List
from core.database import get_db
from middlewares.auth import get_session
from services.memory_store import get_memory_service
router = APIRouter(prefix="/api/memory", tags=["memory"])
@router.get("/preferences")
async def get_preferences(request: Request):
"""获取用户偏好"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
if not user_id:
return {"success": False, "error": "未登录"}
memory_service = get_memory_service()
preferences = memory_service.get_all_preferences(user_id)
return {"success": True, "preferences": preferences}
@router.put("/preferences/{preference_type}")
async def update_preference(
preference_type: str,
value: str,
request: Request
):
"""更新用户偏好"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
if not user_id:
return {"success": False, "error": "未登录"}
memory_service = get_memory_service()
memory_service.store_preference(user_id, preference_type, value)
return {"success": True, "preference_type": preference_type, "value": value}
@router.get("/facts")
async def get_facts(request: Request, limit: int = 10):
"""获取用户事实信息"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
if not user_id:
return {"success": False, "error": "未登录"}
memory_service = get_memory_service()
facts = memory_service.search_memories(user_id, "", limit=limit)
return {
"success": True,
"facts": [
{"key": f.key, "text": f.value.get("text"), "source": f.value.get("source")}
for f in facts
]
}
@router.delete("/facts/{fact_key}")
async def delete_fact(fact_key: str, request: Request):
"""删除用户事实"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
if not user_id:
return {"success": False, "error": "未登录"}
# TODO: 实现删除逻辑
return {"success": True}五、前端集成
5.1 偏好设置组件
添加到 static/js/settings.js:
javascript
class UserPreferences {
constructor() {
this.preferences = {};
this.init();
}
async init() {
await this.loadPreferences();
this.render();
}
async loadPreferences() {
try {
const response = await fetch('/api/memory/preferences');
const data = await response.json();
if (data.success) {
this.preferences = data.preferences;
}
} catch (error) {
console.error('加载偏好失败:', error);
}
}
async setPreference(type, value) {
try {
const response = await fetch(`/api/memory/preferences/${type}?value=${encodeURIComponent(value)}`, {
method: 'PUT'
});
const data = await response.json();
if (data.success) {
this.preferences[type] = value;
this.showToast(`已保存偏好: ${type} = ${value}`);
}
} catch (error) {
console.error('保存偏好失败:', error);
}
}
render() {
// 在设置面板中渲染偏好选项
const container = document.getElementById('preferences-container');
if (!container) return;
container.innerHTML = `
<div class="preference-section">
<h4>对话偏好</h4>
<div class="preference-item">
<label>回复语言</label>
<select id="pref-language" onchange="userPrefs.setPreference('language', this.value)">
<option value="中文" ${this.preferences.language === '中文' ? 'selected' : ''}>中文</option>
<option value="英文" ${this.preferences.language === '英文' ? 'selected' : ''}>英文</option>
</select>
</div>
<div class="preference-item">
<label>回复风格</label>
<select id="pref-style" onchange="userPrefs.setPreference('style', this.value)">
<option value="简洁" ${this.preferences.style === '简洁' ? 'selected' : ''}>简洁</option>
<option value="详细" ${this.preferences.style === '详细' ? 'selected' : ''}>详细</option>
</select>
</div>
</div>
<div class="preference-section">
<h4>关于我</h4>
<p class="hint">AI 会记住这些信息,提供个性化回答</p>
<div id="user-facts"></div>
</div>
`;
this.loadFacts();
}
async loadFacts() {
try {
const response = await fetch('/api/memory/facts?limit=5');
const data = await response.json();
if (data.success) {
const factsContainer = document.getElementById('user-facts');
factsContainer.innerHTML = data.facts.map(f => `
<div class="fact-item">
<span>${f.text}</span>
<button onclick="userPrefs.deleteFact('${f.key}')" class="btn-delete">×</button>
</div>
`).join('');
}
} catch (error) {
console.error('加载事实失败:', error);
}
}
showToast(message) {
// 显示提示消息
const toast = document.createElement('div');
toast.className = 'toast';
toast.textContent = message;
document.body.appendChild(toast);
setTimeout(() => toast.remove(), 3000);
}
}
const userPrefs = new UserPreferences();六、数据库准备
6.1 PostgreSQL Store 表(如使用 PostgreSQL)
sql
-- Store 表会自动创建,以下为参考结构
CREATE TABLE IF NOT EXISTS store (
id SERIAL PRIMARY KEY,
namespace VARCHAR(255) NOT NULL,
key VARCHAR(255) NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace, key)
);
CREATE INDEX idx_store_namespace ON store(namespace);七、测试计划
7.1 单元测试
python
# tests/test_memory_store.py
import pytest
from services.memory_store import MemoryService
def test_store_preference():
"""测试存储偏好"""
service = MemoryService()
service.store_preference("user_123", "language", "中文")
value = service.get_preference("user_123", "language")
assert value == "中文"
def test_store_fact():
"""测试存储事实"""
service = MemoryService()
service.store_fact("user_123", "用户是一名程序员")
facts = service.search_memories("user_123", "程序员")
assert len(facts) > 0
assert "程序员" in facts[0].value["text"]
def test_build_memory_context():
"""测试构建记忆上下文"""
service = MemoryService()
service.store_preference("user_123", "language", "中文")
service.store_fact("user_123", "用户喜欢猫")
context = service.build_memory_context("user_123", "猫")
assert "中文" in context
assert "猫" in context八、实施步骤
步骤 1: Store 基础设施(0.5 天)
- 修改
services/checkpointer.py,添加 Store 初始化 - 配置 PostgreSQL 或使用内存 Store
- 测试 Store 连接
步骤 2: 记忆管理服务(1 天)
- 创建
services/memory_store.py - 实现偏好存储和检索
- 实现事实存储和语义搜索
- 编写单元测试
步骤 3: Agent 集成(0.5 天)
- 修改
services/langgraph_chat.py - 添加记忆上下文构建
- 实现自动记忆提取
步骤 4: API 集成(0.5 天)
- 创建
api/memory.py - 修改
api/chat.py传入用户 ID - 测试 API
步骤 5: 前端集成(0.5 天)
- 创建偏好设置组件
- 集成到设置页面
- 测试完整流程